{ "cells": [ { "cell_type": "markdown", "id": "f424385e", "metadata": {}, "source": [ "# Dask + Zarr, but Remote!\n", "\n", "**Author**: Ilan Gold" ] }, { "cell_type": "markdown", "id": "1201f1a0", "metadata": {}, "source": [ "To begin we need to create a dataset on disk to be used with `dask` in the `zarr` format. We will edit the `chunk_size` argument so that we make fetching expression data for groups of cells more efficient i.e., each access-per-gene over a contiguous group of cells (within the `obs` ordering) will be fast and efficient." ] }, { "cell_type": "code", "execution_count": 1, "id": "440aa8fd", "metadata": {}, "outputs": [], "source": [ "import re\n", "\n", "import dask.array as da\n", "import zarr\n", "\n", "from anndata.experimental import read_dispatched, write_dispatched, read_elem\n", "import scanpy as sc" ] }, { "cell_type": "code", "execution_count": 2, "id": "9c2fa9e1", "metadata": {}, "outputs": [], "source": [ "rel_zarr_path = 'data/pbmc3k_processed.zarr'" ] }, { "cell_type": "code", "execution_count": 3, "id": "9e90c05f", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "adata = sc.datasets.pbmc3k_processed()\n", "adata.write_zarr(f'./{rel_zarr_path}', chunks=[adata.shape[0], 5])\n", "zarr.consolidate_metadata(f'./{rel_zarr_path}')" ] }, { "cell_type": "code", "execution_count": 4, "id": "4537ae1a", "metadata": {}, "outputs": [], "source": [ "def read_dask(store):\n", " f = zarr.open(store, mode=\"r\")\n", "\n", " def callback(func, elem_name: str, elem, iospec):\n", " if iospec.encoding_type in (\n", " \"dataframe\",\n", " \"csr_matrix\",\n", " \"csc_matrix\",\n", " \"awkward-array\",\n", " ):\n", " # Preventing recursing inside of these types\n", " return read_elem(elem)\n", " elif iospec.encoding_type == \"array\":\n", " return da.from_zarr(elem)\n", " else:\n", " return func(elem)\n", "\n", " adata = read_dispatched(f, callback=callback)\n", "\n", " return adata" ] }, { "cell_type": "markdown", "id": "3c7e8165", "metadata": {}, "source": [ "Before continuing, go to a shell and run `python3 -m http.server 8080` out of the directory containing this notebook. This will allow you to observe how different requests are handled by a file server. After this, run the next cell to load the data via the server, using dask arrays \"over the wire\" - note that this functionality is enabled by `dask`'s deep integration with `zarr`, not `hdf5`!" ] }, { "cell_type": "code", "execution_count": 5, "id": "fd9d864b", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 18.50 MiB 51.52 kiB
Shape (2638, 1838) (2638, 5)
Dask graph 368 chunks in 2 graph layers
Data type float32 numpy.ndarray
\n", "
\n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 1838\n", " 2638\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "adata_dask = read_dask(f'http://127.0.0.1:8080/{rel_zarr_path}')\n", "adata_dask.X" ] }, { "cell_type": "code", "execution_count": 6, "id": "5db37304", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 41.22 kiB 41.22 kiB
Shape (2638, 2) (2638, 2)
Dask graph 1 chunks in 2 graph layers
Data type float64 numpy.ndarray
\n", "
\n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 2\n", " 2638\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "adata_dask.obsm['X_draw_graph_fr']" ] }, { "cell_type": "markdown", "id": "3c19ba55", "metadata": {}, "source": [ "Now let's make some requests - slicing over the `obs` axis should be efficient." ] }, { "cell_type": "code", "execution_count": 7, "id": "d724bd2a", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[-0.4751688 ],\n", " [-0.68339145],\n", " [-0.52097213],\n", " ...,\n", " [-0.40973732],\n", " [-0.35466102],\n", " [-0.42529213]], dtype=float32)" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "adata_dask.X[:, adata.var.index == 'C1orf86'].compute()" ] }, { "cell_type": "markdown", "id": "9ab64b9b", "metadata": {}, "source": [ "Indeed, you should only have one additional request now, which looks something like this:\n", "\n", "```\n", "::ffff:127.0.0.1 - - [13/Feb/2023 20:00:36] \"GET /data/pbmc3k_processed.zarr/X/0.0 HTTP/1.1\" 200 -\n", "```\n", "\n", "What about over multiple genes? `adata.var['n_cells'] > 1000 == 59` so this should be less than 59 requests (indeed there are)!" ] }, { "cell_type": "code", "execution_count": 8, "id": "68348d05", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[ 0.53837276, -0.862139 , -1.1624558 , ..., 0.02576654,\n", " -0.7214901 , -0.86157244],\n", " [-0.39546633, -1.4468503 , -0.23953451, ..., -1.8439665 ,\n", " -0.95835304, -0.04634313],\n", " [ 1.036884 , -0.82907706, 0.13356175, ..., -0.91740227,\n", " 1.2407869 , -0.95057184],\n", " ...,\n", " [ 0.9374183 , -0.63782793, 1.4828881 , ..., -0.74470884,\n", " 1.4084249 , 1.8403655 ],\n", " [ 1.4825792 , -0.48758882, 1.2520502 , ..., -0.54854494,\n", " -0.61547786, -0.68133515],\n", " [ 1.2934785 , 1.2127419 , 1.2300901 , ..., -0.5996045 ,\n", " 1.1535971 , -0.8018701 ]], dtype=float32)" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "adata_dask.X[:, adata.var['n_cells'] > 1000].compute()" ] }, { "cell_type": "markdown", "id": "e4bbb07e", "metadata": {}, "source": [ "Now what if we chunk differently, larger? There should be fewer requests made to the server, although now each request will be larger - a tradeoff that needs to be tailored to each use-case!" ] }, { "cell_type": "code", "execution_count": 9, "id": "8ca0e661", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([[ 0.53837276, -0.862139 , -1.1624558 , ..., 0.02576654,\n", " -0.7214901 , -0.86157244],\n", " [-0.39546633, -1.4468503 , -0.23953451, ..., -1.8439665 ,\n", " -0.95835304, -0.04634313],\n", " [ 1.036884 , -0.82907706, 0.13356175, ..., -0.91740227,\n", " 1.2407869 , -0.95057184],\n", " ...,\n", " [ 0.9374183 , -0.63782793, 1.4828881 , ..., -0.74470884,\n", " 1.4084249 , 1.8403655 ],\n", " [ 1.4825792 , -0.48758882, 1.2520502 , ..., -0.54854494,\n", " -0.61547786, -0.68133515],\n", " [ 1.2934785 , 1.2127419 , 1.2300901 , ..., -0.5996045 ,\n", " 1.1535971 , -0.8018701 ]], dtype=float32)" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "adata.write_zarr(f'./{rel_zarr_path}', chunks=[adata.shape[0], 25])\n", "zarr.consolidate_metadata(f'./{rel_zarr_path}')\n", "adata_dask = read_dask(f'http://127.0.0.1:8080/{rel_zarr_path}')\n", "\n", "adata_dask.X[:, adata.var['n_cells'] > 1000].compute()" ] }, { "cell_type": "markdown", "id": "a41a0d18", "metadata": {}, "source": [ "Now what if we had a `layer` that we wanted to chunk in a custom way, e.g. chunked across all cells by gene)? Just use `write_dispatched` as we did with `read_dispatched`!" ] }, { "cell_type": "code", "execution_count": 10, "id": "dc1aa5c5", "metadata": {}, "outputs": [], "source": [ "adata.layers['scaled'] = adata.X.copy()\n", "sc.pp.scale(adata, layer='scaled')" ] }, { "cell_type": "code", "execution_count": 11, "id": "333300f3", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def write_chunked(func, store, k, elem, dataset_kwargs, iospec):\n", " \"\"\"Write callback that chunks X and layers\"\"\"\n", "\n", " def set_chunks(d, chunks=None):\n", " \"\"\"Helper function for setting dataset_kwargs. Makes a copy of d.\"\"\"\n", " d = dict(d)\n", " if chunks is not None:\n", " d[\"chunks\"] = chunks\n", " else:\n", " d.pop(\"chunks\", None) \n", " return d\n", "\n", " if iospec.encoding_type == \"array\":\n", " if 'layers' in k or k.endswith('X'):\n", " dataset_kwargs = set_chunks(dataset_kwargs, (adata.shape[0], 25))\n", " else:\n", " dataset_kwargs = set_chunks(dataset_kwargs, None)\n", "\n", " func(store, k, elem, dataset_kwargs=dataset_kwargs)\n", "\n", "output_zarr_path = \"data/pbmc3k_scaled.zarr\"\n", "z = zarr.open_group(output_zarr_path)\n", "\n", "write_dispatched(z, \"/\", adata, callback=write_chunked)\n", "zarr.consolidate_metadata(f'./{rel_zarr_path}')" ] }, { "cell_type": "code", "execution_count": 12, "id": "4e182800", "metadata": {}, "outputs": [], "source": [ "adata_dask = read_dask(f'http://127.0.0.1:8080/{output_zarr_path}')" ] }, { "cell_type": "code", "execution_count": 13, "id": "530eae20", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Array Chunk
Bytes 18.50 MiB 257.62 kiB
Shape (2638, 1838) (2638, 25)
Dask graph 74 chunks in 2 graph layers
Data type float32 numpy.ndarray
\n", "
\n", " \n", "\n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", "\n", " \n", " 1838\n", " 2638\n", "\n", "
" ], "text/plain": [ "dask.array" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "adata_dask.layers['scaled']" ] } ], "metadata": { "kernelspec": { "display_name": "Python3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 5 }